/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

// $Id: weclients.h 525 2010-01-19 23:18:05Z xlou $
//
/** @file */

#pragma once

#include <iostream>
#include <vector>
#include <queue>
#include <string>
#include <map>
#include <boost/thread.hpp>
#include <boost/thread/condition.hpp>
#include <boost/scoped_array.hpp>

#include "bytestream.h"
//#include "we_message.h"
#include "threadsafequeue.h"
#include "rwlock_local.h"
#include "resourcemanager.h"

#define EXPORT

namespace WriteEngine
{
class WEClients
{
 public:
  /**
   * Constructors
   */
  EXPORT WEClients(int PrgmID);
  EXPORT ~WEClients();

  // static boost::mutex map_mutex;
  EXPORT void addQueue(uint32_t key);
  EXPORT void removeQueue(uint32_t key);
  EXPORT void shutdownQueue(uint32_t key);

  /** @brief read a Write Engine Server response
   *
   * Returns the next message in the inbound queue for unique ids.
   * @param bs A pointer to the ByteStream to fill in.
   * @note: saves a copy vs read(uint32_t, uint32_t).
   */
  EXPORT void read(uint32_t key, messageqcpp::SBS&);

  /** @brief write function to write to specified PM
   */
  EXPORT void write(const messageqcpp::ByteStream& msg, uint32_t connection);

  /** @brief write function to write to all PMs
   */
  EXPORT void write_to_all(const messageqcpp::ByteStream& msg);

  /** @brief Shutdown this object
   *
   * Closes all the connections created during Setup() and cleans up other stuff.
   */
  EXPORT int Close();

  /** @brief Start listening for Write Engine Server responses
   *
   * Starts the current thread listening on the client socket for Write Engine Server response messages. Will
   * not return until busy() returns false or a zero-length response is received.
   */
  EXPORT void Listen(boost::shared_ptr<messageqcpp::MessageQueueClient> client, uint32_t connIndex);

  /** @brief set/unset busy flag
   *
   * Set or unset the busy flag so Listen() can return.
   */
  EXPORT void makeBusy(bool b)
  {
    fBusy = b;
  }

  /** @brief fBusy accessor
   *
   */
  EXPORT bool Busy() const
  {
    return fBusy;
  }

  EXPORT void Setup();

  uint64_t connectedWEServers() const
  {
    return fPmConnections.size();
  }

  /** @brief accessor
   */
  uint32_t getPmCount()
  {
    return pmCount;
  }

 private:
  WEClients(const WEClients& weClient);
  WEClients& operator=(const WEClients& weClient);
  typedef std::vector<boost::thread*> ReaderList;
  typedef std::map<unsigned, boost::shared_ptr<messageqcpp::MessageQueueClient> > ClientList;

  // A queue of ByteStreams coming in from Write Engine Server
  typedef joblist::ThreadSafeQueue<messageqcpp::SBS> WESMsgQueue;

  /* To keep some state associated with the connection */
  struct MQE
  {
    MQE(uint32_t pCount) : ackSocketIndex(0), pmCount(pCount)
    {
      unackedWork.reset(new volatile uint32_t[pmCount]);
      memset((void*)unackedWork.get(), 0, pmCount * sizeof(uint32_t));
    }
    WESMsgQueue queue;
    uint32_t ackSocketIndex;
    boost::scoped_array<volatile uint32_t> unackedWork;
    uint32_t pmCount;
  };

  // The mapping of session ids to StepMsgQueueLists
  typedef std::map<unsigned, boost::shared_ptr<MQE> > MessageQueueMap;

  void StartClientListener(boost::shared_ptr<messageqcpp::MessageQueueClient> cl, uint32_t connIndex);

  /** @brief Add a message to the queue
   *
   */
  void addDataToOutput(messageqcpp::SBS, uint32_t connIndex);

  int fPrgmID;

  ClientList fPmConnections;  // all the Write Engine servers
  ReaderList fWESReader;      // all the reader threads for the pm servers
  MessageQueueMap
      fSessionMessages;  // place to put messages from the pm server to be returned by the Read method
  boost::mutex fMlock;   // sessionMessages mutex
  std::vector<boost::shared_ptr<boost::mutex> > fWlock;  // WES socket write mutexes
  bool fBusy;
  volatile uint32_t closingConnection;
  uint32_t pmCount;
  boost::mutex fOnErrMutex;  // to lock function scope to reset pmconnections under error condition

  boost::mutex ackLock;

 public:
  enum
  {
    DDLPROC = 0,
    SPLITTER,
    DMLPROC,
    BATCHINSERTPROC
  };
};

}  // namespace WriteEngine

#undef EXPORT

